1   package org.apache.lucene.replicator;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.Closeable;
21  import java.io.IOException;
22  import java.util.HashMap;
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.apache.lucene.document.Document;
27  import org.apache.lucene.index.DirectoryReader;
28  import org.apache.lucene.index.IndexWriter;
29  import org.apache.lucene.index.IndexWriterConfig;
30  import org.apache.lucene.index.SnapshotDeletionPolicy;
31  import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
32  import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory;
33  import org.apache.lucene.store.Directory;
34  import org.apache.lucene.store.MockDirectoryWrapper;
35  import org.apache.lucene.util.IOUtils;
36  import org.apache.lucene.util.TestUtil;
37  import org.apache.lucene.util.ThreadInterruptedException;
38  import org.junit.After;
39  import org.junit.Before;
40  import org.junit.Test;
41  
42  public class IndexReplicationClientTest extends ReplicatorTestCase {
43    
44    private static class IndexReadyCallback implements Callable<Boolean>, Closeable {
45      
46      private final Directory indexDir;
47      private DirectoryReader reader; 
48      private long lastGeneration = -1;
49      
50      public IndexReadyCallback(Directory indexDir) throws IOException {
51        this.indexDir = indexDir;
52        if (DirectoryReader.indexExists(indexDir)) {
53          reader = DirectoryReader.open(indexDir);
54          lastGeneration = reader.getIndexCommit().getGeneration();
55        }
56      }
57      
58      @Override
59      public Boolean call() throws Exception {
60        if (reader == null) {
61          reader = DirectoryReader.open(indexDir);
62          lastGeneration = reader.getIndexCommit().getGeneration();
63        } else {
64          DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
65          assertNotNull("should not have reached here if no changes were made to the index", newReader);
66          long newGeneration = newReader.getIndexCommit().getGeneration();
67          assertTrue("expected newer generation; current=" + lastGeneration + " new=" + newGeneration, newGeneration > lastGeneration);
68          reader.close();
69          reader = newReader;
70          lastGeneration = newGeneration;
71          TestUtil.checkIndex(indexDir);
72        }
73        return null;
74      }
75      
76      @Override
77      public void close() throws IOException {
78        IOUtils.close(reader);
79      }
80    }
81    
82    private MockDirectoryWrapper publishDir, handlerDir;
83    private Replicator replicator;
84    private SourceDirectoryFactory sourceDirFactory;
85    private ReplicationClient client;
86    private ReplicationHandler handler;
87    private IndexWriter publishWriter;
88    private IndexReadyCallback callback;
89    
90    private static final String VERSION_ID = "version";
91    
92    private void assertHandlerRevision(int expectedID, Directory dir) throws IOException {
93      // loop as long as client is alive. test-framework will terminate us if
94      // there's a serious bug, e.g. client doesn't really update. otherwise,
95      // introducing timeouts is not good, can easily lead to false positives.
96      while (client.isUpdateThreadAlive()) {
97        // give client a chance to update
98        try {
99          Thread.sleep(100);
100       } catch (InterruptedException e) {
101         throw new ThreadInterruptedException(e);
102       }
103 
104       try {
105         DirectoryReader reader = DirectoryReader.open(dir);
106         try {
107           int handlerID = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
108           if (expectedID == handlerID) {
109             return;
110           } else if (VERBOSE) {
111             System.out.println("expectedID=" + expectedID + " actual=" + handlerID + " generation=" + reader.getIndexCommit().getGeneration());
112           }
113         } finally {
114           reader.close();
115         }
116       } catch (Exception e) {
117         // we can hit IndexNotFoundException or e.g. EOFException (on
118         // segments_N) because it is being copied at the same time it is read by
119         // DirectoryReader.open().
120       }
121     }
122   }
123   
124   private Revision createRevision(final int id) throws IOException {
125     publishWriter.addDocument(new Document());
126     publishWriter.setCommitData(new HashMap<String, String>() {{
127       put(VERSION_ID, Integer.toString(id, 16));
128     }});
129     publishWriter.commit();
130     return new IndexRevision(publishWriter);
131   }
132   
133   @Override
134   @Before
135   public void setUp() throws Exception {
136     super.setUp();
137     publishDir = newMockDirectory();
138     handlerDir = newMockDirectory();
139     sourceDirFactory = new PerSessionDirectoryFactory(createTempDir("replicationClientTest"));
140     replicator = new LocalReplicator();
141     callback = new IndexReadyCallback(handlerDir);
142     handler = new IndexReplicationHandler(handlerDir, callback);
143     client = new ReplicationClient(replicator, handler, sourceDirFactory);
144     
145     IndexWriterConfig conf = newIndexWriterConfig(null);
146     conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()));
147     publishWriter = new IndexWriter(publishDir, conf);
148   }
149   
150   @After
151   @Override
152   public void tearDown() throws Exception {
153     publishWriter.close();
154     IOUtils.close(client, callback, replicator, publishDir, handlerDir);
155     super.tearDown();
156   }
157   
158   @Test
159   public void testNoUpdateThread() throws Exception {
160     assertNull("no version expected at start", handler.currentVersion());
161     
162     // Callback validates the replicated index
163     replicator.publish(createRevision(1));
164     client.updateNow();
165     
166     replicator.publish(createRevision(2));
167     client.updateNow();
168     
169     // Publish two revisions without update, handler should be upgraded to latest
170     replicator.publish(createRevision(3));
171     replicator.publish(createRevision(4));
172     client.updateNow();
173   }
174   
175   @Test
176   public void testUpdateThread() throws Exception {
177     client.startUpdateThread(10, "index");
178     
179     replicator.publish(createRevision(1));
180     assertHandlerRevision(1, handlerDir);
181     
182     replicator.publish(createRevision(2));
183     assertHandlerRevision(2, handlerDir);
184     
185     // Publish two revisions without update, handler should be upgraded to latest
186     replicator.publish(createRevision(3));
187     replicator.publish(createRevision(4));
188     assertHandlerRevision(4, handlerDir);
189   }
190   
191   @Test
192   public void testRestart() throws Exception {
193     replicator.publish(createRevision(1));
194     client.updateNow();
195     
196     replicator.publish(createRevision(2));
197     client.updateNow();
198     
199     client.stopUpdateThread();
200     client.close();
201     client = new ReplicationClient(replicator, handler, sourceDirFactory);
202     
203     // Publish two revisions without update, handler should be upgraded to latest
204     replicator.publish(createRevision(3));
205     replicator.publish(createRevision(4));
206     client.updateNow();
207   }
208 
209   /*
210    * This test verifies that the client and handler do not end up in a corrupt
211    * index if exceptions are thrown at any point during replication. Either when
212    * a client copies files from the server to the temporary space, or when the
213    * handler copies them to the index directory.
214    */
215   @Test
216   public void testConsistencyOnExceptions() throws Exception {
217     // so the handler's index isn't empty
218     replicator.publish(createRevision(1));
219     client.updateNow();
220     client.close();
221     callback.close();
222     
223     // Replicator violates write-once policy. It may be that the
224     // handler copies files to the index dir, then fails to copy a
225     // file and reverts the copy operation. On the next attempt, it
226     // will copy the same file again. There is nothing wrong with this
227     // in a real system, but it does violate write-once, and MDW
228     // doesn't like it. Disabling it means that we won't catch cases
229     // where the handler overwrites an existing index file, but
230     // there's nothing currently we can do about it, unless we don't
231     // use MDW.
232     handlerDir.setPreventDoubleWrite(false);
233 
234     // wrap sourceDirFactory to return a MockDirWrapper so we can simulate errors
235     final SourceDirectoryFactory in = sourceDirFactory;
236     final AtomicInteger failures = new AtomicInteger(atLeast(10));
237     sourceDirFactory = new SourceDirectoryFactory() {
238       
239       private long clientMaxSize = 100, handlerMaxSize = 100;
240       private double clientExRate = 1.0, handlerExRate = 1.0;
241       
242       @Override
243       public void cleanupSession(String sessionID) throws IOException {
244         in.cleanupSession(sessionID);
245       }
246       
247       @SuppressWarnings("synthetic-access")
248       @Override
249       public Directory getDirectory(String sessionID, String source) throws IOException {
250         Directory dir = in.getDirectory(sessionID, source);
251         if (random().nextBoolean() && failures.get() > 0) { // client should fail, return wrapped dir
252           MockDirectoryWrapper mdw = new MockDirectoryWrapper(random(), dir);
253           mdw.setRandomIOExceptionRateOnOpen(clientExRate);
254           mdw.setMaxSizeInBytes(clientMaxSize);
255           mdw.setRandomIOExceptionRate(clientExRate);
256           mdw.setCheckIndexOnClose(false);
257           clientMaxSize *= 2;
258           clientExRate /= 2;
259           return mdw;
260         }
261 
262         if (failures.get() > 0 && random().nextBoolean()) { // handler should fail
263           handlerDir.setMaxSizeInBytes(handlerMaxSize);
264           handlerDir.setRandomIOExceptionRateOnOpen(handlerExRate);
265           handlerDir.setRandomIOExceptionRate(handlerExRate);
266           handlerMaxSize *= 2;
267           handlerExRate /= 2;
268         } else {
269           // disable errors
270           handlerDir.setMaxSizeInBytes(0);
271           handlerDir.setRandomIOExceptionRate(0.0);
272           handlerDir.setRandomIOExceptionRateOnOpen(0.0);
273         }
274         return dir;
275       }
276     };
277     
278     handler = new IndexReplicationHandler(handlerDir, new Callable<Boolean>() {
279       @Override
280       public Boolean call() throws Exception {
281         if (random().nextDouble() < 0.2 && failures.get() > 0) {
282           throw new RuntimeException("random exception from callback");
283         }
284         return null;
285       }
286     });
287     
288     // wrap handleUpdateException so we can act on the thrown exception
289     client = new ReplicationClient(replicator, handler, sourceDirFactory) {
290       @SuppressWarnings("synthetic-access")
291       @Override
292       protected void handleUpdateException(Throwable t) {
293         if (t instanceof IOException) {
294           if (VERBOSE) {
295             System.out.println("hit exception during update: " + t);
296             t.printStackTrace(System.out);
297           }
298           try {
299             // test that the index can be read and also some basic statistics
300             DirectoryReader reader = DirectoryReader.open(handlerDir.getDelegate());
301             try {
302               int numDocs = reader.numDocs();
303               int version = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
304               assertEquals(numDocs, version);
305             } finally {
306               reader.close();
307             }
308             // verify index consistency
309             TestUtil.checkIndex(handlerDir.getDelegate());
310           } catch (IOException e) {
311             // exceptions here are bad, don't ignore them
312             throw new RuntimeException(e);
313           } finally {
314             // count-down number of failures
315             failures.decrementAndGet();
316             assert failures.get() >= 0 : "handler failed too many times: " + failures.get();
317             if (VERBOSE) {
318               if (failures.get() == 0) {
319                 System.out.println("no more failures expected");
320               } else {
321                 System.out.println("num failures left: " + failures.get());
322               }
323             }
324           }
325         } else {
326           if (t instanceof RuntimeException) throw (RuntimeException) t;
327           throw new RuntimeException(t);
328         }
329       }
330     };
331     
332     client.startUpdateThread(10, "index");
333 
334     final Directory baseHandlerDir = handlerDir.getDelegate();
335     int numRevisions = atLeast(20);
336     for (int i = 2; i < numRevisions; i++) {
337       replicator.publish(createRevision(i));
338       assertHandlerRevision(i, baseHandlerDir);
339     }
340     
341     // disable errors -- maybe randomness didn't exhaust all allowed failures,
342     // and we don't want e.g. CheckIndex to hit false errors. 
343     handlerDir.setMaxSizeInBytes(0);
344     handlerDir.setRandomIOExceptionRate(0.0);
345     handlerDir.setRandomIOExceptionRateOnOpen(0.0);
346   }
347   
348 }